package esl

import (
	"context"
	"errors"
	"fmt"
	"gitee.com/zackeus/go-boot/freeswitch/esl/command"
	"gitee.com/zackeus/go-boot/freeswitch/esl/events"
	"gitee.com/zackeus/go-boot/freeswitch/esl/headers"
	"gitee.com/zackeus/go-boot/freeswitch/esl/log"
	"gitee.com/zackeus/go-boot/freeswitch/esl/response"
	"gitee.com/zackeus/go-zero/core/logx"
	"gitee.com/zackeus/go-zero/core/trace"
	"gitee.com/zackeus/goutil/strutil"
	"github.com/google/uuid"
	cmap "github.com/orcaman/concurrent-map/v2"
	"go.opentelemetry.io/otel/codes"
	oteltrace "go.opentelemetry.io/otel/trace"
	"net"
	"strings"
	"sync"
	"time"
)

var (
	spanInboundName  = "esl.inbound"
	commonCloseError = errors.New("exit normally")
	disconnectError  = errors.New("esl disconnected")
)

type (
	InboundOption func(e *InboundEngine)

	InboundEngine struct {
		core              *Core
		cfg               *Config // Generic common options to both Inbound and Outbound Conn
		startOnce         sync.Once
		closeOnce         sync.Once
		mu                sync.Mutex
		ctx               context.Context
		stopFunc          func()
		network           string        // 网络类型应为 tcp、tcp4、tcp6
		address           string        // FS 连接地址
		password          string        // 连接验证密码. 默认 ClueCon
		authTimeout       time.Duration // 验证超时
		reconnectInterval time.Duration // 重连间隔
		jobChannels       cmap.ConcurrentMap[string, chan *response.RawResponse]
		eventListeners    map[string]map[string]events.EventListener
		onReconnected     func() // 重连回调
	}
)

// NewInbound - Connects to FreeSWITCH ESL on the address with the provided options. Returns the connection and any errors encountered
func NewInbound(address string, opts ...InboundOption) *InboundEngine {
	/* 构建上下文 */
	ctx, stop := context.WithCancel(context.Background())

	e := &InboundEngine{
		ctx:               ctx,
		stopFunc:          stop,
		network:           "tcp",
		address:           address,
		password:          "ClueCon",
		authTimeout:       5 * time.Second,
		reconnectInterval: 3 * time.Second,
		jobChannels:       cmap.New[chan *response.RawResponse](),
		eventListeners:    make(map[string]map[string]events.EventListener, 1),
	}
	e.eventListeners[events.EventBackRoundJob] = map[string]events.EventListener{"1": e.onBackgroundJob}
	e.cfg = &Config{
		eventConcurrentSize: 1000,
		exitTimeout:         5 * time.Second,
		eventListener:       e.onEvent,
	}

	for _, opt := range opts {
		opt(e)
	}
	return e
}

// Active 是否存活
func (e *InboundEngine) Active() bool {
	return e.core != nil && e.core.ctx.Err() == nil
}

func (e *InboundEngine) startInbound() error {
	c, err := net.Dial(e.network, e.address)
	if err != nil {
		return err
	}
	if e.core, err = NewCore(c, e.cfg); err != nil {
		return err
	}
	/* 清空 bgapi job 并注册回调事件 */
	e.jobChannels.Clear()

	select {
	case res := <-e.core.responseChannels[response.TypeRudeRejection]:
		/* 连接被拒绝 */
		e.core.shutdown()
		return errors.New(res.GetReply())
	case <-e.core.responseChannels[response.TypeAuthRequest]:
		/* 首次鉴权请求 */
	}

	authCtx, cancel := context.WithTimeout(e.core.ctx, e.authTimeout)
	err = e.doAuth(authCtx, command.Auth{Password: e.password})
	cancel()
	if err != nil {
		/* 认证失败 断开连接 */
		e.core.shutdown(true)
		return err
	} else {
		/* 认证成功 */
		log.Info("Successfully authenticated %s\n", e.core.conn.RemoteAddr())
	}
	/* 初始化订阅事件 */
	enableEvents := make([]string, 0, len(e.eventListeners))
	customEvents := make([]string, 0, len(e.eventListeners))
	for key, _ := range e.eventListeners {
		if strings.HasPrefix(key, events.EventCustom) {
			customEvents = append(customEvents, key)
		} else {
			enableEvents = append(enableEvents, key)
		}
	}
	/* CUSTOM 事件在队列最后 */
	enableEvents = append(enableEvents, customEvents...)
	if _, err = e.core.send(context.Background(), command.Event{Listen: enableEvents}); err != nil {
		e.core.shutdown(true)
		return err
	}

	// Inbound only handlers
	go e.authLoop(command.Auth{Password: e.password}, e.authTimeout)
	go e.disconnectLoop()
	return nil
}

// 鉴权
func (e *InboundEngine) doAuth(ctx context.Context, auth command.Auth) error {
	res, err := e.core.send(ctx, auth)
	if err != nil {
		return err
	}
	if !res.IsOk() {
		return fmt.Errorf("failed to auth %#v", res)
	}
	return nil
}

// 发送 bgapi 并同步等待响应
func (e *InboundEngine) sendBgApiSync(ctx context.Context, cmd *command.API, timeout time.Duration) (*response.RawResponse, error) {
	/* bgapi 同步等待执行结果 */
	cmd.JobUUID = uuid.New().String()
	/* 设置执行超时时间 */
	mCtx, cancel := context.WithTimeout(ctx, timeout)
	channel := e.registerJobChannel(cmd.JobUUID)
	defer func(uuid string) {
		cancel()
		/* 注销 Job 回调 */
		e.unRegisterJobChannel(uuid)
	}(cmd.JobUUID)

	/* 链路追踪 */
	tracer := trace.TracerFromContext(ctx)
	ctx, span := tracer.Start(ctx, spanInboundName, oteltrace.WithSpanKind(oteltrace.SpanKindConsumer))
	span.SetAttributes(eslCmdAttributeKey.String(cmd.BuildCmd()))
	defer span.End()

	if _, err := e.core.send(ctx, cmd); err != nil {
		return nil, err
	}

	select {
	case res := <-channel:
		span.SetStatus(codes.Ok, "")
		return res, nil
	case <-mCtx.Done():
		/* bgapi 等待回执超时 */
		err := errors.New(fmt.Sprintf("bgapi[%s] waiting for receipt timed out", cmd.BuildCmd()))
		span.SetStatus(codes.Error, err.Error())
		span.RecordError(err)
		return nil, err
	}
}

func (e *InboundEngine) authLoop(auth command.Auth, authTimeout time.Duration) {
	for {
		select {
		case <-e.core.responseChannels[response.TypeAuthRequest]:
			authCtx, cancel := context.WithTimeout(e.core.ctx, authTimeout)
			err := e.doAuth(authCtx, auth)
			cancel()
			if err != nil {
				log.Warn("Failed to auth %e\n", err)
				// Close the connection, we have the wrong password
				e.core.shutdown(true)
				return
			} else {
				log.Info("Successfully authenticated %s\n", e.core.conn.RemoteAddr())
			}
		case <-e.core.ctx.Done():
			return
		}
	}
}

func (e *InboundEngine) disconnectLoop() {
	var err error
	select {
	case res, ok := <-e.core.responseChannels[response.TypeDisconnect]:
		logx.Debug(fmt.Sprintf("esl disconnected, the res: %v", res))

		err = disconnectError
		if ok && strutil.IsStartOf(res.GetReply(), "Disconnected, goodbye.") {
			/* 正常退出 */
			err = commonCloseError
		}
		/* 连接已断开 */
		e.core.shutdown()
	case res, ok := <-e.core.responseChannels[response.TypeRudeRejection]:
		/* 连接被拒绝 */
		if ok {
			err = errors.New(res.GetReply())
		} else {
			err = errors.New(" esl connection refused")
		}

		e.core.shutdown()
	case <-e.core.ctx.Done():
		/* 未知原因退出 */
		err = commonCloseError
	}

	e.onDisconnect(err)
	return
}

func (e *InboundEngine) onEvent(event *events.Event) {
	// 首先检查是否有通用事件监听器
	if listeners, ok := e.eventListeners[events.EventAll]; ok {
		for _, listener := range listeners {
			listener(event)
		}
	}

	eventType := event.GetName()
	if eventType == events.EventCustom {
		/* 自定义事件 根据 Event-Subclass 区分 */
		eventType = fmt.Sprintf("%s %s", events.EventCustom, event.GetHeader(headers.EventSubclass))
	}
	if listeners, ok := e.eventListeners[eventType]; ok {
		for _, listener := range listeners {
			listener(event)
		}
	}
}

// BACKGROUND_JOB 回调
func (e *InboundEngine) onBackgroundJob(event *events.Event) {
	if !event.HasHeader(headers.JobUUID) {
		return
	}

	c, exists := e.jobChannels.Pop(event.GetHeader(headers.JobUUID))
	if exists {
		c <- &response.RawResponse{
			Headers: event.Headers,
			Body:    event.Body,
		}
		close(c)
	}
}

// 连接断开回调
func (e *InboundEngine) onDisconnect(err error) {
	switch err {
	case commonCloseError:
		/* 正常退出 */
	case nil, disconnectError:
		e.mu.Lock()
		defer e.mu.Unlock()
		if e.core.IsActive() {
			/* 已连接 */
			return
		}
		log.Debug("disconnected will reconnect.")

		/* 重连 */
		ticker := time.NewTicker(e.reconnectInterval)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				if err := e.startInbound(); err == nil {
					if e.onReconnected != nil {
						/* 重连回调 */
						go e.onReconnected()
					}
					return
				}
				log.Error("reconnection failed, will try again.")
			case <-e.ctx.Done():
				return
			}
		}
	default:
		log.Error("disconnected: ", err)
		return
	}
}

// 注册 bgapi job channel
func (e *InboundEngine) registerJobChannel(jobUUID string) <-chan *response.RawResponse {
	channel := make(chan *response.RawResponse, 1)
	e.jobChannels.Set(jobUUID, channel)
	return channel
}

// 注销 bgapi job channel
func (e *InboundEngine) unRegisterJobChannel(jobUUID string) {
	e.jobChannels.Remove(jobUUID)
}

// Subscribe 订阅事件
func (e *InboundEngine) Subscribe(eventType string, listener events.EventListener) string {
	e.mu.Lock()
	defer e.mu.Unlock()

	id := uuid.New().String()
	if _, ok := e.eventListeners[eventType]; ok {
		e.eventListeners[eventType][id] = listener
	} else {
		e.eventListeners[eventType] = map[string]events.EventListener{id: listener}
	}
	return id
}

// UnSubscribe 注销事件
// eventType: 事件类型
// ids: RegisterEvent 生成的listenerID
func (e *InboundEngine) UnSubscribe(eventType string, ids ...string) {
	e.mu.Lock()
	defer e.mu.Unlock()

	if ids == nil {
		delete(e.eventListeners, eventType)
		return
	}
	if listeners, ok := e.eventListeners[eventType]; ok {
		for _, id := range ids {
			delete(listeners, id)
		}
	}
}

func (e *InboundEngine) Start() error {
	var err error
	e.startOnce.Do(func() {
		err = e.startInbound()
	})
	return err
}

func (e *InboundEngine) Close() {
	e.closeOnce.Do(func() {
		if e.core != nil {
			e.core.shutdown(true)
		}
		e.eventListeners = nil
		e.jobChannels.Clear()
		e.stopFunc()
	})
}

// WithPassword 连接密码
func WithPassword(password string) InboundOption {
	return func(e *InboundEngine) {
		e.password = password
	}
}

// WithAuthTimeout 鉴权认证超时
func WithAuthTimeout(t time.Duration) InboundOption {
	return func(e *InboundEngine) {
		e.authTimeout = t
	}
}

// WithExitTimeout 退出超时
func WithExitTimeout(t time.Duration) InboundOption {
	return func(e *InboundEngine) {
		e.cfg.exitTimeout = t
	}
}

// WithEventConcurrentSize 事件并发大小
func WithEventConcurrentSize(size uint64) InboundOption {
	return func(e *InboundEngine) {
		e.cfg.eventConcurrentSize = size
	}
}

// WithReconnectInterval 重连间隔
func WithReconnectInterval(t time.Duration) InboundOption {
	return func(e *InboundEngine) {
		e.reconnectInterval = t
	}
}

// WithOnReconnected 重连回调
func WithOnReconnected(f func()) InboundOption {
	return func(e *InboundEngine) {
		e.onReconnected = f
	}
}
